/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iceberg.spark.source;

import static org.apache.iceberg.ManifestContent.DATA;
import static org.apache.iceberg.ManifestContent.DELETES;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public abstract class TestIcebergSourceTablesBase extends TestBase {

  private static final Schema SCHEMA =
      new Schema(
          optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()));

  private static final Schema SCHEMA2 =
      new Schema(
          optional(1, "id", Types.IntegerType.get()),
          optional(2, "data", Types.StringType.get()),
          optional(3, "category", Types.StringType.get()));

  private static final Schema SCHEMA3 =
      new Schema(
          optional(1, "id", Types.IntegerType.get()),
          optional(3, "category", Types.StringType.get()));

  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build();

  @TempDir protected Path temp;

  public abstract Table createTable(
      TableIdentifier ident, Schema schema, PartitionSpec spec, Map<String, String> properties);

  public abstract Table loadTable(TableIdentifier ident, String entriesSuffix);

  public abstract String loadLocation(TableIdentifier ident, String entriesSuffix);

  public abstract String loadLocation(TableIdentifier ident);

  public abstract void dropTable(TableIdentifier ident) throws IOException;

  @AfterEach
  public void removeTable() {
    spark.sql("DROP TABLE IF EXISTS parquet_table");
  }

  private Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
    return createTable(ident, schema, spec, ImmutableMap.of());
  }

  @Test
  public synchronized void testTablesSupport() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<SimpleRecord> expectedRecords =
        Lists.newArrayList(
            new SimpleRecord(1, "1"), new SimpleRecord(2, "2"), new SimpleRecord(3, "3"));

    Dataset<Row> inputDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode(SaveMode.Append)
        .save(loadLocation(tableIdentifier));

    Dataset<Row> resultDf = spark.read().format("iceberg").load(loadLocation(tableIdentifier));
    List<SimpleRecord> actualRecords =
        resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();

    assertThat(actualRecords).as("Records should match").isEqualTo(expectedRecords);
  }

  @Test
  public void testEntriesTable() throws Exception {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
    Table entriesTable = loadTable(tableIdentifier, "entries");

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));

    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();

    Dataset<Row> entriesTableDs =
        spark.read().format("iceberg").load(loadLocation(tableIdentifier, "entries"));
    List<Row> actual = TestHelpers.selectNonDerived(entriesTableDs).collectAsList();

    Snapshot snapshot = table.currentSnapshot();

    assertThat(snapshot.allManifests(table.io())).as("Should only contain one manifest").hasSize(1);

    InputFile manifest = table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path());
    List<GenericData.Record> expected = Lists.newArrayList();
    try (CloseableIterable<GenericData.Record> rows =
        Avro.read(manifest).project(entriesTable.schema()).build()) {
      // each row must inherit snapshot_id and sequence_number
      rows.forEach(
          row -> {
            row.put(2, 1L); // data sequence number
            row.put(3, 1L); // file sequence number
            GenericData.Record file = (GenericData.Record) row.get("data_file");
            TestHelpers.asMetadataRecord(file);
            expected.add(row);
          });
    }

    assertThat(expected).as("Entries table should have one row").hasSize(1);
    assertThat(actual).as("Actual results should have one row").hasSize(1);
    TestHelpers.assertEqualsSafe(
        TestHelpers.nonDerivedSchema(entriesTableDs), expected.get(0), actual.get(0));
  }

  @Test
  public void testEntriesTablePartitionedPrune() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));

    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "entries"))
            .select("status")
            .collectAsList();

    assertThat(actual).as("Results should contain only one status").hasSize(1);
    assertThat(actual.get(0).getInt(0)).as("That status should be Added (1)").isEqualTo(1);
  }

  @Test
  public void testEntriesTableDataFilePrune() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));

    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    DataFile file = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();

    List<Object[]> singleActual =
        rowsToJava(
            spark
                .read()
                .format("iceberg")
                .load(loadLocation(tableIdentifier, "entries"))
                .select("data_file.file_path")
                .collectAsList());

    List<Object[]> singleExpected = ImmutableList.of(row(file.path()));

    assertEquals(
        "Should prune a single element from a nested struct", singleExpected, singleActual);
  }

  @Test
  public void testEntriesTableDataFilePruneMulti() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));

    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    DataFile file = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();

    List<Object[]> multiActual =
        rowsToJava(
            spark
                .read()
                .format("iceberg")
                .load(loadLocation(tableIdentifier, "entries"))
                .select(
                    "data_file.file_path",
                    "data_file.value_counts",
                    "data_file.record_count",
                    "data_file.column_sizes")
                .collectAsList());

    List<Object[]> multiExpected =
        ImmutableList.of(
            row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes()));

    assertEquals("Should prune a single element from a nested struct", multiExpected, multiActual);
  }

  @Test
  public void testFilesSelectMap() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));

    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    DataFile file = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();

    List<Object[]> multiActual =
        rowsToJava(
            spark
                .read()
                .format("iceberg")
                .load(loadLocation(tableIdentifier, "files"))
                .select("file_path", "value_counts", "record_count", "column_sizes")
                .collectAsList());

    List<Object[]> multiExpected =
        ImmutableList.of(
            row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes()));

    assertEquals("Should prune a single element from a row", multiExpected, multiActual);
  }

  @Test
  public void testAllEntriesTable() throws Exception {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
    Table entriesTable = loadTable(tableIdentifier, "all_entries");

    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "b")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // delete the first file to test that not only live files are listed
    table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // ensure table data isn't stale
    table.refresh();

    Dataset<Row> entriesTableDs =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "all_entries"))
            .orderBy("snapshot_id");
    List<Row> actual = TestHelpers.selectNonDerived(entriesTableDs).collectAsList();

    List<GenericData.Record> expected = Lists.newArrayList();
    for (ManifestFile manifest :
        Iterables.concat(Iterables.transform(table.snapshots(), s -> s.allManifests(table.io())))) {
      InputFile in = table.io().newInputFile(manifest.path());
      try (CloseableIterable<GenericData.Record> rows =
          Avro.read(in).project(entriesTable.schema()).build()) {
        // each row must inherit snapshot_id and sequence_number
        rows.forEach(
            row -> {
              if (row.get("snapshot_id").equals(table.currentSnapshot().snapshotId())) {
                row.put(2, 3L); // data sequence number
                row.put(3, 3L); // file sequence number
              } else {
                row.put(2, 1L); // data sequence number
                row.put(3, 1L); // file sequence number
              }
              GenericData.Record file = (GenericData.Record) row.get("data_file");
              TestHelpers.asMetadataRecord(file);
              expected.add(row);
            });
      }
    }

    expected.sort(Comparator.comparing(o -> (Long) o.get("snapshot_id")));

    assertThat(expected).as("Entries table should have 3 rows").hasSize(3);
    assertThat(actual).as("Actual results should have 3 rows").hasSize(3);

    for (int i = 0; i < expected.size(); i += 1) {
      TestHelpers.assertEqualsSafe(
          TestHelpers.nonDerivedSchema(entriesTableDs), expected.get(i), actual.get(i));
    }
  }

  @Test
  public void testCountEntriesTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "count_entries_test");
    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    // init load
    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    final int expectedEntryCount = 1;

    // count entries
    assertThat(
            spark.read().format("iceberg").load(loadLocation(tableIdentifier, "entries")).count())
        .as("Count should return " + expectedEntryCount)
        .isEqualTo(expectedEntryCount);

    // count all_entries
    assertThat(
            spark
                .read()
                .format("iceberg")
                .load(loadLocation(tableIdentifier, "all_entries"))
                .count())
        .as("Count should return " + expectedEntryCount)
        .isEqualTo(expectedEntryCount);
  }

  @Test
  public void testFilesTable() throws Exception {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table entriesTable = loadTable(tableIdentifier, "entries");

    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // delete the first file to test that only live files are listed
    table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();

    Dataset<Row> filesTableDs =
        spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"));
    List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();

    List<GenericData.Record> expected = Lists.newArrayList();
    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
      InputFile in = table.io().newInputFile(manifest.path());
      try (CloseableIterable<GenericData.Record> rows =
          Avro.read(in).project(entriesTable.schema()).build()) {
        for (GenericData.Record record : rows) {
          if ((Integer) record.get("status") < 2 /* added or existing */) {
            GenericData.Record file = (GenericData.Record) record.get("data_file");
            TestHelpers.asMetadataRecord(file);
            expected.add(file);
          }
        }
      }
    }

    assertThat(expected).as("Files table should have one row").hasSize(1);
    assertThat(actual).as("Actual results should have one row").hasSize(1);

    TestHelpers.assertEqualsSafe(
        TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0), actual.get(0));
  }

  @Test
  public void testFilesTableWithSnapshotIdInheritance() throws Exception {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit();
    Table entriesTable = loadTable(tableIdentifier, "entries");

    spark.sql(
        String.format(
            "CREATE TABLE parquet_table (data string, id int) "
                + "USING parquet PARTITIONED BY (id) LOCATION '%s'",
            temp.toFile()));

    List<SimpleRecord> records =
        Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));

    Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
    inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table");

    NameMapping mapping = MappingUtil.create(table.schema());
    String mappingJson = NameMappingParser.toJson(mapping);

    table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit();

    String stagingLocation = table.location() + "/metadata";
    SparkTableUtil.importSparkTable(
        spark,
        new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
        table,
        stagingLocation);

    Dataset<Row> filesTableDs =
        spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"));
    List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();

    List<GenericData.Record> expected = Lists.newArrayList();
    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
      InputFile in = table.io().newInputFile(manifest.path());
      try (CloseableIterable<GenericData.Record> rows =
          Avro.read(in).project(entriesTable.schema()).build()) {
        for (GenericData.Record record : rows) {
          GenericData.Record file = (GenericData.Record) record.get("data_file");
          TestHelpers.asMetadataRecord(file);
          expected.add(file);
        }
      }
    }

    Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
    assertThat(expected).as("Files table should have 2 rows").hasSize(2);
    assertThat(actual).as("Actual results should have 2 rows").hasSize(2);
    TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
    TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
  }

  @Test
  public void testV1EntriesTableWithSnapshotIdInheritance() throws Exception {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_inheritance_test");
    Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "1");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC, properties);

    table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit();

    spark.sql(
        String.format(
            "CREATE TABLE parquet_table (data string, id int) "
                + "USING parquet PARTITIONED BY (id) LOCATION '%s'",
            temp.toFile()));

    List<SimpleRecord> records =
        Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));

    Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
    inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table");

    String stagingLocation = table.location() + "/metadata";
    SparkTableUtil.importSparkTable(
        spark,
        new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
        table,
        stagingLocation);

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "entries"))
            .select("sequence_number", "snapshot_id", "data_file")
            .collectAsList();

    table.refresh();

    long snapshotId = table.currentSnapshot().snapshotId();

    assertThat(actual).as("Entries table should have 2 rows").hasSize(2);
    assertThat(actual.get(0).getLong(0)).as("Sequence number must match").isEqualTo(0);
    assertThat(actual.get(0).getLong(1)).as("Snapshot id must match").isEqualTo(snapshotId);
    assertThat(actual.get(1).getLong(0)).as("Sequence number must match").isEqualTo(0);
    assertThat(actual.get(1).getLong(1)).as("Snapshot id must match").isEqualTo(snapshotId);
  }

  @Test
  public void testFilesUnpartitionedTable() throws Exception {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "unpartitioned_files_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
    Table entriesTable = loadTable(tableIdentifier, "entries");

    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    DataFile toDelete =
        Iterables.getOnlyElement(table.currentSnapshot().addedDataFiles(table.io()));

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // delete the first file to test that only live files are listed
    table.newDelete().deleteFile(toDelete).commit();

    Dataset<Row> filesTableDs =
        spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"));
    List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();

    List<GenericData.Record> expected = Lists.newArrayList();
    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
      InputFile in = table.io().newInputFile(manifest.path());
      try (CloseableIterable<GenericData.Record> rows =
          Avro.read(in).project(entriesTable.schema()).build()) {
        for (GenericData.Record record : rows) {
          if ((Integer) record.get("status") < 2 /* added or existing */) {
            GenericData.Record file = (GenericData.Record) record.get("data_file");
            TestHelpers.asMetadataRecord(file);
            expected.add(file);
          }
        }
      }
    }

    assertThat(expected).as("Files table should have one row").hasSize(1);
    assertThat(actual).as("Actual results should have one row").hasSize(1);
    TestHelpers.assertEqualsSafe(
        TestHelpers.nonDerivedSchema(filesTableDs), expected.get(0), actual.get(0));
  }

  @Test
  public void testAllMetadataTablesWithStagedCommits() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "stage_aggregate_table_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);

    table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit();
    spark.conf().set(SparkSQLProperties.WAP_ID, "1234567");
    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    List<Row> actualAllData =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "all_data_files"))
            .collectAsList();

    List<Row> actualAllManifests =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "all_manifests"))
            .collectAsList();

    List<Row> actualAllEntries =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "all_entries"))
            .collectAsList();

    assertThat(table.snapshots().iterator()).as("Stage table should have some snapshots").hasNext();
    assertThat(table.currentSnapshot()).as("Stage table should have null currentSnapshot").isNull();
    assertThat(actualAllData).as("Actual results should have two rows").hasSize(2);
    assertThat(actualAllManifests).as("Actual results should have two rows").hasSize(2);
    assertThat(actualAllEntries).as("Actual results should have two rows").hasSize(2);
  }

  @Test
  public void testAllDataFilesTable() throws Exception {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table entriesTable = loadTable(tableIdentifier, "entries");

    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // delete the first file to test that not only live files are listed
    table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // ensure table data isn't stale
    table.refresh();

    Dataset<Row> filesTableDs =
        spark.read().format("iceberg").load(loadLocation(tableIdentifier, "all_data_files"));
    List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();
    actual.sort(Comparator.comparing(o -> o.getString(1)));

    List<GenericData.Record> expected = Lists.newArrayList();
    Iterable<ManifestFile> dataManifests =
        Iterables.concat(
            Iterables.transform(table.snapshots(), snapshot -> snapshot.dataManifests(table.io())));
    for (ManifestFile manifest : dataManifests) {
      InputFile in = table.io().newInputFile(manifest.path());
      try (CloseableIterable<GenericData.Record> rows =
          Avro.read(in).project(entriesTable.schema()).build()) {
        for (GenericData.Record record : rows) {
          if ((Integer) record.get("status") < 2 /* added or existing */) {
            GenericData.Record file = (GenericData.Record) record.get("data_file");
            TestHelpers.asMetadataRecord(file);
            expected.add(file);
          }
        }
      }
    }

    expected.sort(Comparator.comparing(o -> o.get("file_path").toString()));

    assertThat(expected).as("Files table should have two rows").hasSize(2);
    assertThat(actual).as("Actual results should have two rows").hasSize(2);
    for (int i = 0; i < expected.size(); i += 1) {
      TestHelpers.assertEqualsSafe(
          TestHelpers.nonDerivedSchema(filesTableDs), expected.get(i), actual.get(i));
    }
  }

  @Test
  public void testHistoryTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "history_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
    Table historyTable = loadTable(tableIdentifier, "history");

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);

    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
    long firstSnapshotId = table.currentSnapshot().snapshotId();

    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
    long secondSnapshotId = table.currentSnapshot().snapshotId();

    // rollback the table state to the first snapshot
    table.manageSnapshots().rollbackTo(firstSnapshotId).commit();
    long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis();

    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
    long thirdSnapshotId = table.currentSnapshot().snapshotId();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "history"))
            .collectAsList();

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(historyTable.schema(), "history"));
    List<GenericData.Record> expected =
        Lists.newArrayList(
            builder
                .set("made_current_at", firstSnapshotTimestamp * 1000)
                .set("snapshot_id", firstSnapshotId)
                .set("parent_id", null)
                .set("is_current_ancestor", true)
                .build(),
            builder
                .set("made_current_at", secondSnapshotTimestamp * 1000)
                .set("snapshot_id", secondSnapshotId)
                .set("parent_id", firstSnapshotId)
                .set(
                    "is_current_ancestor",
                    false) // commit rolled back, not an ancestor of the current table state
                .build(),
            builder
                .set("made_current_at", rollbackTimestamp * 1000)
                .set("snapshot_id", firstSnapshotId)
                .set("parent_id", null)
                .set("is_current_ancestor", true)
                .build(),
            builder
                .set("made_current_at", thirdSnapshotTimestamp * 1000)
                .set("snapshot_id", thirdSnapshotId)
                .set("parent_id", firstSnapshotId)
                .set("is_current_ancestor", true)
                .build());

    assertThat(actual).as("History table should have a row for each commit").hasSize(4);
    TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(0), actual.get(0));
    TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(1), actual.get(1));
    TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(2), actual.get(2));
    TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(3), actual.get(3));
  }

  @Test
  public void testSnapshotsTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
    Table snapTable = loadTable(tableIdentifier, "snapshots");

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);

    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
    long firstSnapshotId = table.currentSnapshot().snapshotId();
    String firstManifestList = table.currentSnapshot().manifestListLocation();

    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();

    long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
    long secondSnapshotId = table.currentSnapshot().snapshotId();
    String secondManifestList = table.currentSnapshot().manifestListLocation();

    // rollback the table state to the first snapshot
    table.manageSnapshots().rollbackTo(firstSnapshotId).commit();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "snapshots"))
            .collectAsList();

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(snapTable.schema(), "snapshots"));
    List<GenericData.Record> expected =
        Lists.newArrayList(
            builder
                .set("committed_at", firstSnapshotTimestamp * 1000)
                .set("snapshot_id", firstSnapshotId)
                .set("parent_id", null)
                .set("operation", "append")
                .set("manifest_list", firstManifestList)
                .set(
                    "summary",
                    ImmutableMap.of(
                        "added-records", "1",
                        "added-data-files", "1",
                        "changed-partition-count", "1",
                        "total-data-files", "1",
                        "total-records", "1"))
                .build(),
            builder
                .set("committed_at", secondSnapshotTimestamp * 1000)
                .set("snapshot_id", secondSnapshotId)
                .set("parent_id", firstSnapshotId)
                .set("operation", "delete")
                .set("manifest_list", secondManifestList)
                .set(
                    "summary",
                    ImmutableMap.of(
                        "deleted-records", "1",
                        "deleted-data-files", "1",
                        "changed-partition-count", "1",
                        "total-records", "0",
                        "total-data-files", "0"))
                .build());

    assertThat(actual).as("Snapshots table should have a row for each snapshot").hasSize(2);
    TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(0), actual.get(0));
    TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1));
  }

  @Test
  public void testPrunedSnapshotsTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);

    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
    long firstSnapshotId = table.currentSnapshot().snapshotId();

    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();

    long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();

    // rollback the table state to the first snapshot
    table.manageSnapshots().rollbackTo(firstSnapshotId).commit();

    Dataset<Row> actualDf =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "snapshots"))
            .select("operation", "committed_at", "summary", "parent_id");

    Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema());

    List<Row> actual = actualDf.collectAsList();

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema, "snapshots"));
    List<GenericData.Record> expected =
        Lists.newArrayList(
            builder
                .set("committed_at", firstSnapshotTimestamp * 1000)
                .set("parent_id", null)
                .set("operation", "append")
                .set(
                    "summary",
                    ImmutableMap.of(
                        "added-records", "1",
                        "added-data-files", "1",
                        "changed-partition-count", "1",
                        "total-data-files", "1",
                        "total-records", "1"))
                .build(),
            builder
                .set("committed_at", secondSnapshotTimestamp * 1000)
                .set("parent_id", firstSnapshotId)
                .set("operation", "delete")
                .set(
                    "summary",
                    ImmutableMap.of(
                        "deleted-records", "1",
                        "deleted-data-files", "1",
                        "changed-partition-count", "1",
                        "total-records", "0",
                        "total-data-files", "0"))
                .build());

    assertThat(actual).as("Snapshots table should have a row for each snapshot").hasSize(2);
    TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0));
    TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(1), actual.get(1));
  }

  @Test
  public void testManifestsTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table manifestTable = loadTable(tableIdentifier, "manifests");
    Dataset<Row> df1 =
        spark.createDataFrame(
            Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")),
            SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .option(SparkWriteOptions.DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
        .save(loadLocation(tableIdentifier));

    table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

    DeleteFile deleteFile = writePosDeleteFile(table);

    table.newRowDelta().addDeletes(deleteFile).commit();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "manifests"))
            .collectAsList();

    table.refresh();

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(manifestTable.schema(), "manifests"));
    GenericRecordBuilder summaryBuilder =
        new GenericRecordBuilder(
            AvroSchemaUtil.convert(
                manifestTable.schema().findType("partition_summaries.element").asStructType(),
                "partition_summary"));
    List<GenericData.Record> expected =
        Lists.transform(
            table.currentSnapshot().allManifests(table.io()),
            manifest ->
                builder
                    .set("content", manifest.content().id())
                    .set("path", manifest.path())
                    .set("length", manifest.length())
                    .set("partition_spec_id", manifest.partitionSpecId())
                    .set("added_snapshot_id", manifest.snapshotId())
                    .set(
                        "added_data_files_count",
                        manifest.content() == DATA ? manifest.addedFilesCount() : 0)
                    .set(
                        "existing_data_files_count",
                        manifest.content() == DATA ? manifest.existingFilesCount() : 0)
                    .set(
                        "deleted_data_files_count",
                        manifest.content() == DATA ? manifest.deletedFilesCount() : 0)
                    .set(
                        "added_delete_files_count",
                        manifest.content() == DELETES ? manifest.addedFilesCount() : 0)
                    .set(
                        "existing_delete_files_count",
                        manifest.content() == DELETES ? manifest.existingFilesCount() : 0)
                    .set(
                        "deleted_delete_files_count",
                        manifest.content() == DELETES ? manifest.deletedFilesCount() : 0)
                    .set(
                        "partition_summaries",
                        Lists.transform(
                            manifest.partitions(),
                            partition ->
                                summaryBuilder
                                    .set("contains_null", manifest.content() == DATA)
                                    .set("contains_nan", false)
                                    .set("lower_bound", "1")
                                    .set("upper_bound", "1")
                                    .build()))
                    .build());

    assertThat(actual).as("Manifests table should have two manifest rows").hasSize(2);
    TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0));
    TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(1), actual.get(1));
  }

  @Test
  public void testPruneManifestsTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table manifestTable = loadTable(tableIdentifier, "manifests");
    Dataset<Row> df1 =
        spark.createDataFrame(
            Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")),
            SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    if (!spark.version().startsWith("2")) {
      // Spark 2 isn't able to actually push down nested struct projections so this will not break
      assertThatThrownBy(
              () ->
                  spark
                      .read()
                      .format("iceberg")
                      .load(loadLocation(tableIdentifier, "manifests"))
                      .select("partition_spec_id", "path", "partition_summaries.contains_null")
                      .collectAsList())
          .isInstanceOf(SparkException.class)
          .hasMessageContaining("Cannot project a partial list element struct");
    }

    Dataset<Row> actualDf =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "manifests"))
            .select("partition_spec_id", "path", "partition_summaries");

    Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema());

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "manifests"))
            .select("partition_spec_id", "path", "partition_summaries")
            .collectAsList();

    table.refresh();

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct()));
    GenericRecordBuilder summaryBuilder =
        new GenericRecordBuilder(
            AvroSchemaUtil.convert(
                projectedSchema.findType("partition_summaries.element").asStructType(),
                "partition_summary"));
    List<GenericData.Record> expected =
        Lists.transform(
            table.currentSnapshot().allManifests(table.io()),
            manifest ->
                builder
                    .set("partition_spec_id", manifest.partitionSpecId())
                    .set("path", manifest.path())
                    .set(
                        "partition_summaries",
                        Lists.transform(
                            manifest.partitions(),
                            partition ->
                                summaryBuilder
                                    .set("contains_null", true)
                                    .set("contains_nan", false)
                                    .set("lower_bound", "1")
                                    .set("upper_bound", "1")
                                    .build()))
                    .build());

    assertThat(actual).as("Manifests table should have one manifest row").hasSize(1);
    TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0));
  }

  @Test
  public void testAllManifestsTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table manifestTable = loadTable(tableIdentifier, "all_manifests");
    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

    DeleteFile deleteFile = writePosDeleteFile(table);

    table.newRowDelta().addDeletes(deleteFile).commit();

    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();

    Stream<Pair<Long, ManifestFile>> snapshotIdToManifests =
        StreamSupport.stream(table.snapshots().spliterator(), false)
            .flatMap(
                snapshot ->
                    snapshot.allManifests(table.io()).stream()
                        .map(manifest -> Pair.of(snapshot.snapshotId(), manifest)));

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "all_manifests"))
            .orderBy("path")
            .collectAsList();

    table.refresh();

    List<GenericData.Record> expected =
        snapshotIdToManifests
            .map(
                snapshotManifest ->
                    manifestRecord(
                        manifestTable, snapshotManifest.first(), snapshotManifest.second()))
            .sorted(Comparator.comparing(o -> o.get("path").toString()))
            .collect(Collectors.toList());

    assertThat(actual).as("Manifests table should have 5 manifest rows").hasSize(5);
    for (int i = 0; i < expected.size(); i += 1) {
      TestHelpers.assertEqualsSafe(
          manifestTable.schema().asStruct(), expected.get(i), actual.get(i));
    }
  }

  @Test
  public void testUnpartitionedPartitionsTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "unpartitioned_partitions_test");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    Dataset<Row> df =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);

    df.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    Types.StructType expectedSchema =
        Types.StructType.of(
            required(2, "record_count", Types.LongType.get(), "Count of records in data files"),
            required(3, "file_count", Types.IntegerType.get(), "Count of data files"),
            required(
                11,
                "total_data_file_size_in_bytes",
                Types.LongType.get(),
                "Total size in bytes of data files"),
            required(
                5,
                "position_delete_record_count",
                Types.LongType.get(),
                "Count of records in position delete files"),
            required(
                6,
                "position_delete_file_count",
                Types.IntegerType.get(),
                "Count of position delete files"),
            required(
                7,
                "equality_delete_record_count",
                Types.LongType.get(),
                "Count of records in equality delete files"),
            required(
                8,
                "equality_delete_file_count",
                Types.IntegerType.get(),
                "Count of equality delete files"),
            optional(
                9,
                "last_updated_at",
                Types.TimestampType.withZone(),
                "Commit time of snapshot that last updated this partition"),
            optional(
                10,
                "last_updated_snapshot_id",
                Types.LongType.get(),
                "Id of snapshot that last updated this partition"));

    Table partitionsTable = loadTable(tableIdentifier, "partitions");

    assertThat(expectedSchema)
        .as("Schema should not have partition field")
        .isEqualTo(partitionsTable.schema().asStruct());

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions"));
    GenericData.Record expectedRow =
        builder
            .set("last_updated_at", table.currentSnapshot().timestampMillis() * 1000)
            .set("last_updated_snapshot_id", table.currentSnapshot().snapshotId())
            .set("record_count", 1L)
            .set("file_count", 1)
            .set(
                "total_data_file_size_in_bytes",
                totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .build();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .collectAsList();

    assertThat(actual).as("Unpartitioned partitions table should have one row").hasSize(1);
    TestHelpers.assertEqualsSafe(expectedSchema, expectedRow, actual.get(0));
  }

  @Test
  public void testPartitionsTable() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table partitionsTable = loadTable(tableIdentifier, "partitions");
    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long firstCommitId = table.currentSnapshot().snapshotId();

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long secondCommitId = table.currentSnapshot().snapshotId();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .orderBy("partition.id")
            .collectAsList();

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions"));
    GenericRecordBuilder partitionBuilder =
        new GenericRecordBuilder(
            AvroSchemaUtil.convert(
                partitionsTable.schema().findType("partition").asStructType(), "partition"));
    List<GenericData.Record> expected = Lists.newArrayList();
    expected.add(
        builder
            .set("partition", partitionBuilder.set("id", 1).build())
            .set("record_count", 1L)
            .set("file_count", 1)
            .set(
                "total_data_file_size_in_bytes",
                totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .set("spec_id", 0)
            .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000)
            .set("last_updated_snapshot_id", firstCommitId)
            .build());
    expected.add(
        builder
            .set("partition", partitionBuilder.set("id", 2).build())
            .set("record_count", 1L)
            .set("file_count", 1)
            .set(
                "total_data_file_size_in_bytes",
                totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .set("spec_id", 0)
            .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000)
            .set("last_updated_snapshot_id", secondCommitId)
            .build());

    assertThat(expected).as("Partitions table should have two rows").hasSize(2);
    assertThat(actual).as("Actual results should have two rows").hasSize(2);
    for (int i = 0; i < 2; i += 1) {
      TestHelpers.assertEqualsSafe(
          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
    }

    // check time travel
    List<Row> actualAfterFirstCommit =
        spark
            .read()
            .format("iceberg")
            .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId))
            .load(loadLocation(tableIdentifier, "partitions"))
            .orderBy("partition.id")
            .collectAsList();

    assertThat(actualAfterFirstCommit).as("Actual results should have one row").hasSize(1);
    TestHelpers.assertEqualsSafe(
        partitionsTable.schema().asStruct(), expected.get(0), actualAfterFirstCommit.get(0));

    // check predicate push down
    List<Row> filtered =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .filter("partition.id < 2")
            .collectAsList();

    assertThat(filtered).as("Actual results should have one row").hasSize(1);
    TestHelpers.assertEqualsSafe(
        partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));

    List<Row> nonFiltered =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .filter("partition.id < 2 or record_count=1")
            .collectAsList();

    assertThat(nonFiltered).as("Actual results should have two rows").hasSize(2);
    for (int i = 0; i < 2; i += 1) {
      TestHelpers.assertEqualsSafe(
          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
    }
  }

  @Test
  public void testPartitionsTableLastUpdatedSnapshot() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table partitionsTable = loadTable(tableIdentifier, "partitions");
    Dataset<Row> df1 =
        spark.createDataFrame(
            Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2, "2")),
            SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long firstCommitId = table.currentSnapshot().snapshotId();

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long secondCommitId = table.currentSnapshot().snapshotId();

    // check if rewrite manifest does not override metadata about data file's creating snapshot
    RewriteManifests.Result rewriteManifestResult =
        SparkActions.get().rewriteManifests(table).execute();
    assertThat(rewriteManifestResult.rewrittenManifests())
        .as("rewrite replaced 2 manifests")
        .hasSize(2);

    assertThat(rewriteManifestResult.addedManifests()).as("rewrite added 1 manifests").hasSize(1);

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .orderBy("partition.id")
            .collectAsList();

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions"));
    GenericRecordBuilder partitionBuilder =
        new GenericRecordBuilder(
            AvroSchemaUtil.convert(
                partitionsTable.schema().findType("partition").asStructType(), "partition"));

    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
    assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));

    List<GenericData.Record> expected = Lists.newArrayList();
    expected.add(
        builder
            .set("partition", partitionBuilder.set("id", 1).build())
            .set("record_count", 1L)
            .set("file_count", 1)
            .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .set("spec_id", 0)
            .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000)
            .set("last_updated_snapshot_id", firstCommitId)
            .build());
    expected.add(
        builder
            .set("partition", partitionBuilder.set("id", 2).build())
            .set("record_count", 2L)
            .set("file_count", 2)
            .set(
                "total_data_file_size_in_bytes",
                dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes())
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .set("spec_id", 0)
            .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000)
            .set("last_updated_snapshot_id", secondCommitId)
            .build());

    assertThat(expected).as("Partitions table should have two rows").hasSize(2);
    assertThat(actual).as("Actual results should have two rows").hasSize(2);
    for (int i = 0; i < 2; i += 1) {
      TestHelpers.assertEqualsSafe(
          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
    }

    // check predicate push down
    List<Row> filtered =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .filter("partition.id < 2")
            .collectAsList();
    assertThat(filtered).as("Actual results should have one row").hasSize(1);
    TestHelpers.assertEqualsSafe(
        partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));

    // check for snapshot expiration
    // if snapshot with firstCommitId is expired,
    // we expect the partition of id=1 will no longer have last updated timestamp and snapshotId
    SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
    GenericData.Record newPartitionRecord =
        builder
            .set("partition", partitionBuilder.set("id", 1).build())
            .set("record_count", 1L)
            .set("file_count", 1)
            .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .set("spec_id", 0)
            .set("last_updated_at", null)
            .set("last_updated_snapshot_id", null)
            .build();
    expected.remove(0);
    expected.add(0, newPartitionRecord);

    List<Row> actualAfterSnapshotExpiration =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .collectAsList();
    assertThat(actualAfterSnapshotExpiration).as("Actual results should have two rows").hasSize(2);
    for (int i = 0; i < 2; i += 1) {
      TestHelpers.assertEqualsSafe(
          partitionsTable.schema().asStruct(),
          expected.get(i),
          actualAfterSnapshotExpiration.get(i));
    }
  }

  @Test
  public void testPartitionsTableDeleteStats() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table partitionsTable = loadTable(tableIdentifier, "partitions");
    Dataset<Row> df1 =
        spark.createDataFrame(
            Lists.newArrayList(
                new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")),
            SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(
            Lists.newArrayList(
                new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")),
            SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    long firstCommitId = table.currentSnapshot().snapshotId();

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // test position deletes
    table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
    DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
    DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
    table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
    table.refresh();
    long posDeleteCommitId = table.currentSnapshot().snapshotId();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .orderBy("partition.id")
            .collectAsList();
    assertThat(actual).as("Actual results should have two rows").hasSize(2);

    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions"));
    GenericRecordBuilder partitionBuilder =
        new GenericRecordBuilder(
            AvroSchemaUtil.convert(
                partitionsTable.schema().findType("partition").asStructType(), "partition"));
    List<GenericData.Record> expected = Lists.newArrayList();
    expected.add(
        builder
            .set("partition", partitionBuilder.set("id", 1).build())
            .set("record_count", 3L)
            .set("file_count", 1)
            .set(
                "total_data_file_size_in_bytes",
                totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .set("spec_id", 0)
            .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000)
            .set("last_updated_snapshot_id", firstCommitId)
            .build());
    expected.add(
        builder
            .set("partition", partitionBuilder.set("id", 2).build())
            .set("record_count", 3L)
            .set("file_count", 1)
            .set(
                "total_data_file_size_in_bytes",
                totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
            .set("position_delete_record_count", 2L) // should be incremented now
            .set("position_delete_file_count", 2) // should be incremented now
            .set("equality_delete_record_count", 0L)
            .set("equality_delete_file_count", 0)
            .set("spec_id", 0)
            .set("last_updated_at", table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
            .set("last_updated_snapshot_id", posDeleteCommitId)
            .build());

    for (int i = 0; i < 2; i += 1) {
      TestHelpers.assertEqualsSafe(
          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
    }

    // test equality delete
    DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
    DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
    table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
    table.refresh();
    long eqDeleteCommitId = table.currentSnapshot().snapshotId();
    actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "partitions"))
            .orderBy("partition.id")
            .collectAsList();
    assertThat(actual).as("Actual results should have two rows").hasSize(2);
    expected.remove(0);
    expected.add(
        0,
        builder
            .set("partition", partitionBuilder.set("id", 1).build())
            .set("record_count", 3L)
            .set("file_count", 1)
            .set("position_delete_record_count", 0L)
            .set("position_delete_file_count", 0)
            .set("equality_delete_record_count", 2L) // should be incremented now
            .set("equality_delete_file_count", 2) // should be incremented now
            .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
            .set("last_updated_snapshot_id", eqDeleteCommitId)
            .build());
    for (int i = 0; i < 2; i += 1) {
      TestHelpers.assertEqualsSafe(
          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
    }
  }

  @Test
  public synchronized void testSnapshotReadAfterAddColumn() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<Row> originalRecords =
        Lists.newArrayList(
            RowFactory.create(1, "x"), RowFactory.create(2, "y"), RowFactory.create(3, "z"));

    StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA);
    Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode(SaveMode.Append)
        .save(loadLocation(tableIdentifier));

    table.refresh();

    Dataset<Row> resultDf = spark.read().format("iceberg").load(loadLocation(tableIdentifier));
    assertThat(originalRecords)
        .as("Records should match")
        .isEqualTo(resultDf.orderBy("id").collectAsList());

    Snapshot snapshotBeforeAddColumn = table.currentSnapshot();

    table.updateSchema().addColumn("category", Types.StringType.get()).commit();

    List<Row> newRecords =
        Lists.newArrayList(RowFactory.create(4, "xy", "B"), RowFactory.create(5, "xyz", "C"));

    StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA2);
    Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, newSparkSchema);
    inputDf2
        .select("id", "data", "category")
        .write()
        .format("iceberg")
        .mode(SaveMode.Append)
        .save(loadLocation(tableIdentifier));

    table.refresh();

    List<Row> updatedRecords =
        Lists.newArrayList(
            RowFactory.create(1, "x", null),
            RowFactory.create(2, "y", null),
            RowFactory.create(3, "z", null),
            RowFactory.create(4, "xy", "B"),
            RowFactory.create(5, "xyz", "C"));

    Dataset<Row> resultDf2 = spark.read().format("iceberg").load(loadLocation(tableIdentifier));
    assertThat(updatedRecords)
        .as("Records should match")
        .isEqualTo(resultDf2.orderBy("id").collectAsList());

    Dataset<Row> resultDf3 =
        spark
            .read()
            .format("iceberg")
            .option(SparkReadOptions.SNAPSHOT_ID, snapshotBeforeAddColumn.snapshotId())
            .load(loadLocation(tableIdentifier));

    assertThat(originalRecords)
        .as("Records should match")
        .isEqualTo(resultDf3.orderBy("id").collectAsList());

    assertThat(resultDf3.schema()).as("Schemas should match").isEqualTo(originalSparkSchema);
  }

  @Test
  public synchronized void testSnapshotReadAfterDropColumn() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
    Table table = createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned());

    List<Row> originalRecords =
        Lists.newArrayList(
            RowFactory.create(1, "x", "A"),
            RowFactory.create(2, "y", "A"),
            RowFactory.create(3, "z", "B"));

    StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA2);
    Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
    inputDf
        .select("id", "data", "category")
        .write()
        .format("iceberg")
        .mode(SaveMode.Append)
        .save(loadLocation(tableIdentifier));

    table.refresh();

    Dataset<Row> resultDf = spark.read().format("iceberg").load(loadLocation(tableIdentifier));

    assertThat(resultDf.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(originalRecords);

    long tsBeforeDropColumn = waitUntilAfter(System.currentTimeMillis());
    table.updateSchema().deleteColumn("data").commit();
    long tsAfterDropColumn = waitUntilAfter(System.currentTimeMillis());

    List<Row> newRecords = Lists.newArrayList(RowFactory.create(4, "B"), RowFactory.create(5, "C"));

    StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA3);
    Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, newSparkSchema);
    inputDf2
        .select("id", "category")
        .write()
        .format("iceberg")
        .mode(SaveMode.Append)
        .save(loadLocation(tableIdentifier));

    table.refresh();

    List<Row> updatedRecords =
        Lists.newArrayList(
            RowFactory.create(1, "A"),
            RowFactory.create(2, "A"),
            RowFactory.create(3, "B"),
            RowFactory.create(4, "B"),
            RowFactory.create(5, "C"));

    Dataset<Row> resultDf2 = spark.read().format("iceberg").load(loadLocation(tableIdentifier));
    assertThat(resultDf2.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(updatedRecords);

    Dataset<Row> resultDf3 =
        spark
            .read()
            .format("iceberg")
            .option(SparkReadOptions.AS_OF_TIMESTAMP, tsBeforeDropColumn)
            .load(loadLocation(tableIdentifier));

    assertThat(resultDf3.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(originalRecords);

    assertThat(resultDf3.schema()).as("Schemas should match").isEqualTo(originalSparkSchema);

    // At tsAfterDropColumn, there has been a schema change, but no new snapshot,
    // so the snapshot as of tsAfterDropColumn is the same as that as of tsBeforeDropColumn.
    Dataset<Row> resultDf4 =
        spark
            .read()
            .format("iceberg")
            .option(SparkReadOptions.AS_OF_TIMESTAMP, tsAfterDropColumn)
            .load(loadLocation(tableIdentifier));

    assertThat(resultDf4.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(originalRecords);

    assertThat(resultDf4.schema()).as("Schemas should match").isEqualTo(originalSparkSchema);
  }

  @Test
  public synchronized void testSnapshotReadAfterAddAndDropColumn() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<Row> originalRecords =
        Lists.newArrayList(
            RowFactory.create(1, "x"), RowFactory.create(2, "y"), RowFactory.create(3, "z"));

    StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA);
    Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
    inputDf
        .select("id", "data")
        .write()
        .format("iceberg")
        .mode(SaveMode.Append)
        .save(loadLocation(tableIdentifier));

    table.refresh();

    Dataset<Row> resultDf = spark.read().format("iceberg").load(loadLocation(tableIdentifier));

    assertThat(resultDf.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(originalRecords);

    Snapshot snapshotBeforeAddColumn = table.currentSnapshot();

    table.updateSchema().addColumn("category", Types.StringType.get()).commit();

    List<Row> newRecords =
        Lists.newArrayList(RowFactory.create(4, "xy", "B"), RowFactory.create(5, "xyz", "C"));

    StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert(SCHEMA2);
    Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, sparkSchemaAfterAddColumn);
    inputDf2
        .select("id", "data", "category")
        .write()
        .format("iceberg")
        .mode(SaveMode.Append)
        .save(loadLocation(tableIdentifier));

    table.refresh();

    List<Row> updatedRecords =
        Lists.newArrayList(
            RowFactory.create(1, "x", null),
            RowFactory.create(2, "y", null),
            RowFactory.create(3, "z", null),
            RowFactory.create(4, "xy", "B"),
            RowFactory.create(5, "xyz", "C"));

    Dataset<Row> resultDf2 = spark.read().format("iceberg").load(loadLocation(tableIdentifier));

    assertThat(resultDf2.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(updatedRecords);

    table.updateSchema().deleteColumn("data").commit();

    List<Row> recordsAfterDropColumn =
        Lists.newArrayList(
            RowFactory.create(1, null),
            RowFactory.create(2, null),
            RowFactory.create(3, null),
            RowFactory.create(4, "B"),
            RowFactory.create(5, "C"));

    Dataset<Row> resultDf3 = spark.read().format("iceberg").load(loadLocation(tableIdentifier));

    assertThat(resultDf3.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(recordsAfterDropColumn);

    Dataset<Row> resultDf4 =
        spark
            .read()
            .format("iceberg")
            .option(SparkReadOptions.SNAPSHOT_ID, snapshotBeforeAddColumn.snapshotId())
            .load(loadLocation(tableIdentifier));

    assertThat(resultDf4.orderBy("id").collectAsList())
        .as("Records should match")
        .isEqualTo(originalRecords);

    assertThat(resultDf4.schema()).as("Schemas should match").isEqualTo(originalSparkSchema);
  }

  @Test
  public void testRemoveOrphanFilesActionSupport() throws InterruptedException {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));

    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);

    df.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    df.write().mode("append").parquet(table.location() + "/data");

    // sleep for 1 second to ensure files will be old enough
    Thread.sleep(1000);

    SparkActions actions = SparkActions.get();

    DeleteOrphanFiles.Result result1 =
        actions
            .deleteOrphanFiles(table)
            .location(table.location() + "/metadata")
            .olderThan(System.currentTimeMillis())
            .execute();

    assertThat(result1.orphanFileLocations()).as("Should not delete any metadata files").isEmpty();

    DeleteOrphanFiles.Result result2 =
        actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();

    assertThat(result2.orphanFileLocations()).as("Should delete 1 data file").hasSize(1);

    Dataset<Row> resultDF = spark.read().format("iceberg").load(loadLocation(tableIdentifier));
    List<SimpleRecord> actualRecords =
        resultDF.as(Encoders.bean(SimpleRecord.class)).collectAsList();

    assertThat(actualRecords).as("Rows must match").isEqualTo(records);
  }

  @Test
  public void testFilesTablePartitionId() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
    Table table =
        createTable(
            tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
    int spec0 = table.spec().specId();

    Dataset<Row> df1 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
    Dataset<Row> df2 =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

    df1.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    // change partition spec
    table.refresh();
    table.updateSpec().removeField("id").commit();
    int spec1 = table.spec().specId();

    // add a second file
    df2.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    List<Integer> actual =
        spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"))
            .sort(DataFile.SPEC_ID.name()).collectAsList().stream()
            .map(r -> (Integer) r.getAs(DataFile.SPEC_ID.name()))
            .collect(Collectors.toList());

    assertThat(ImmutableList.of(spec0, spec1))
        .as("Should have two partition specs")
        .isEqualTo(actual);
  }

  @Test
  public void testAllManifestTableSnapshotFiltering() {
    TableIdentifier tableIdentifier = TableIdentifier.of("db", "all_manifest_snapshot_filtering");
    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
    Table manifestTable = loadTable(tableIdentifier, "all_manifests");
    Dataset<Row> df =
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);

    List<Pair<Long, ManifestFile>> snapshotIdToManifests = Lists.newArrayList();

    df.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    Snapshot snapshot1 = table.currentSnapshot();
    snapshotIdToManifests.addAll(
        snapshot1.allManifests(table.io()).stream()
            .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest))
            .collect(Collectors.toList()));

    df.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    table.refresh();
    Snapshot snapshot2 = table.currentSnapshot();
    assertThat(snapshot2.allManifests(table.io())).as("Should have two manifests").hasSize(2);
    snapshotIdToManifests.addAll(
        snapshot2.allManifests(table.io()).stream()
            .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest))
            .collect(Collectors.toList()));

    // Add manifests that will not be selected
    df.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));
    df.select("id", "data")
        .write()
        .format("iceberg")
        .mode("append")
        .save(loadLocation(tableIdentifier));

    StringJoiner snapshotIds = new StringJoiner(",", "(", ")");
    snapshotIds.add(String.valueOf(snapshot1.snapshotId()));
    snapshotIds.add(String.valueOf(snapshot2.snapshotId()));
    snapshotIds.toString();

    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier, "all_manifests"))
            .filter("reference_snapshot_id in " + snapshotIds)
            .orderBy("path")
            .collectAsList();
    table.refresh();

    List<GenericData.Record> expected =
        snapshotIdToManifests.stream()
            .map(
                snapshotManifest ->
                    manifestRecord(
                        manifestTable, snapshotManifest.first(), snapshotManifest.second()))
            .sorted(Comparator.comparing(o -> o.get("path").toString()))
            .collect(Collectors.toList());

    assertThat(actual).as("Manifests table should have 3 manifest rows").hasSize(3);
    for (int i = 0; i < expected.size(); i += 1) {
      TestHelpers.assertEqualsSafe(
          manifestTable.schema().asStruct(), expected.get(i), actual.get(i));
    }
  }

  @Test
  public void testTableWithInt96Timestamp() throws IOException {
    File parquetTableDir = temp.resolve("table_timestamp_int96").toFile();
    String parquetTableLocation = parquetTableDir.toURI().toString();
    Schema schema =
        new Schema(
            optional(1, "id", Types.LongType.get()),
            optional(2, "tmp_col", Types.TimestampType.withZone()));
    spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96");

    LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0);
    LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0);
    long startSec = start.toEpochSecond(ZoneOffset.UTC);
    long endSec = end.toEpochSecond(ZoneOffset.UTC);
    Column idColumn = functions.expr("id");
    Column secondsColumn =
        functions.expr("(id % " + (endSec - startSec) + " + " + startSec + ")").as("seconds");
    Column timestampColumn = functions.expr("cast( seconds as timestamp) as tmp_col");

    for (Boolean useDict : new Boolean[] {true, false}) {
      for (Boolean useVectorization : new Boolean[] {true, false}) {
        spark.sql("DROP TABLE IF EXISTS parquet_table");
        spark
            .range(0, 5000, 100, 1)
            .select(idColumn, secondsColumn)
            .select(idColumn, timestampColumn)
            .write()
            .format("parquet")
            .option("parquet.enable.dictionary", useDict)
            .mode("overwrite")
            .option("path", parquetTableLocation)
            .saveAsTable("parquet_table");
        TableIdentifier tableIdentifier = TableIdentifier.of("db", "table_with_timestamp_int96");
        Table table = createTable(tableIdentifier, schema, PartitionSpec.unpartitioned());
        table
            .updateProperties()
            .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, useVectorization.toString())
            .commit();

        String stagingLocation = table.location() + "/metadata";
        SparkTableUtil.importSparkTable(
            spark,
            new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
            table,
            stagingLocation);

        // validate we get the expected results back
        testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
        testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
        testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
        testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
        testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
        dropTable(tableIdentifier);
      }
    }
  }

  private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) {
    List<Row> expected =
        spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
    List<Row> actual =
        spark
            .read()
            .format("iceberg")
            .load(loadLocation(tableIdentifier))
            .select("tmp_col")
            .filter(filterExpr)
            .collectAsList();
    assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected);
  }

  private GenericData.Record manifestRecord(
      Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
    GenericRecordBuilder builder =
        new GenericRecordBuilder(AvroSchemaUtil.convert(manifestTable.schema(), "manifests"));
    GenericRecordBuilder summaryBuilder =
        new GenericRecordBuilder(
            AvroSchemaUtil.convert(
                manifestTable.schema().findType("partition_summaries.element").asStructType(),
                "partition_summary"));
    return builder
        .set("content", manifest.content().id())
        .set("path", manifest.path())
        .set("length", manifest.length())
        .set("partition_spec_id", manifest.partitionSpecId())
        .set("added_snapshot_id", manifest.snapshotId())
        .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0)
        .set(
            "existing_data_files_count",
            manifest.content() == DATA ? manifest.existingFilesCount() : 0)
        .set(
            "deleted_data_files_count",
            manifest.content() == DATA ? manifest.deletedFilesCount() : 0)
        .set(
            "added_delete_files_count",
            manifest.content() == DELETES ? manifest.addedFilesCount() : 0)
        .set(
            "existing_delete_files_count",
            manifest.content() == DELETES ? manifest.existingFilesCount() : 0)
        .set(
            "deleted_delete_files_count",
            manifest.content() == DELETES ? manifest.deletedFilesCount() : 0)
        .set(
            "partition_summaries",
            Lists.transform(
                manifest.partitions(),
                partition ->
                    summaryBuilder
                        .set("contains_null", false)
                        .set("contains_nan", false)
                        .set("lower_bound", "1")
                        .set("upper_bound", "1")
                        .build()))
        .set("reference_snapshot_id", referenceSnapshotId)
        .build();
  }

  private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
      Table table, PartitionSpec spec, StructLike partition) {
    OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 0).build();
    EncryptedOutputFile outputFile = fileFactory.newOutputFile(spec, partition);

    SparkFileWriterFactory fileWriterFactory = SparkFileWriterFactory.builderFor(table).build();
    return fileWriterFactory.newPositionDeleteWriter(outputFile, spec, partition);
  }

  private DeleteFile writePositionDeletes(
      Table table,
      PartitionSpec spec,
      StructLike partition,
      Iterable<PositionDelete<InternalRow>> deletes) {
    PositionDeleteWriter<InternalRow> positionDeleteWriter =
        newPositionDeleteWriter(table, spec, partition);

    try (PositionDeleteWriter<InternalRow> writer = positionDeleteWriter) {
      for (PositionDelete<InternalRow> delete : deletes) {
        writer.write(delete);
      }
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }

    return positionDeleteWriter.toDeleteFile();
  }

  private DeleteFile writePosDeleteFile(Table table) {
    return writePosDeleteFile(table, 0L);
  }

  private DeleteFile writePosDeleteFile(Table table, long pos) {
    DataFile dataFile =
        Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null);
    PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
    StructLike dataFilePartition = dataFile.partition();

    PositionDelete<InternalRow> delete = PositionDelete.create();
    delete.set(dataFile.path(), pos, null);

    return writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete));
  }

  private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
    List<Record> deletes = Lists.newArrayList();
    Schema deleteRowSchema = SCHEMA.select("data");
    Record delete = GenericRecord.create(deleteRowSchema);
    deletes.add(delete.copy("data", dataValue));
    try {
      return FileHelpers.writeDeleteFile(
          table,
          Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
          org.apache.iceberg.TestHelpers.Row.of(1),
          deletes,
          deleteRowSchema);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
    return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
  }

  private void assertDataFilePartitions(
      List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
    assertThat(dataFiles)
        .as("Table should have " + expectedPartitionIds.size() + " data files")
        .hasSameSizeAs(expectedPartitionIds);

    for (int i = 0; i < dataFiles.size(); ++i) {
      assertThat(dataFiles.get(i).partition().get(0, Integer.class).intValue())
          .as("Data file should have partition of id " + expectedPartitionIds.get(i))
          .isEqualTo(expectedPartitionIds.get(i).intValue());
    }
  }
}
